Dask:为什么内存使用量激增?

Dask: why is memory usage blowing up?

我有一个小数据框(大约 100MB)和我想对每一行执行的昂贵计算。它不是可向量化的计算;它需要对每一行进行一些解析和数据库查找。

因此,我决定尝试使用 Dask 来并行化任务。该任务是“令人尴尬的并行”,执行顺序或重复执行都不是问题。但是,由于某些未知原因,内存使用量激增至约 100GB。

这是有问题的代码示例:

import pandas as pd
import numpy as np
import dask.dataframe as dd

from dask.distributed import Client
from dask_jobqueue import LSFCluster

cluster = LSFCluster(memory="6GB", cores=1, project='gRNA Library Design')
cluster.scale(jobs=16)
client = Client(cluster)

required_dict = load_big_dict()
score_guide = lambda row: expensive_computation(required_dict, row)

library_df = pd.read_csv(args.library_csv)

meta = library_df.dtypes
meta = meta.append(pd.Series({
    'specificity': np.dtype('int64'),
    'cutting_efficiency': np.dtype('int64'), 
    '0 Off-targets': np.dtype('object'),
    '1 Off-targets': np.dtype('object'),
    '2 Off-targets': np.dtype('object'),
    '3 Off-targets': np.dtype('object')}))
    
library_ddf = dd.from_pandas(library_df, npartitions=32)
library_ddf = library_ddf.apply(score_guide, axis=1, meta=meta)
library_ddf = library_ddf.compute()
library_ddf = library_ddf.drop_duplicates()
library_ddf.to_csv(args.outfile, index=False)

我的猜测是查找所需的大字典不知何故是个问题,但它的大小总共只有 ~1.5GB,并且不包含在结果数据框中。

为什么 Dask 会炸毁内存使用量?

不能 100% 确定这会在这种情况下解决问题,但您可以尝试 futurize 字典:

# broadcasting makes sure that every worker has a copy
[fut_dict] = client.scatter([required_dict], broadcast=True)
score_guide = lambda row: expensive_computation(fut_dict, row)

这样做是将字典的副本放在每个工作人员上,并在 fut_dict 中存储对该对象的引用,从而避免在每次调用该函数时都需要 hash 大字典:

Every time you pass a concrete result (anything that isn’t delayed) Dask will hash it by default to give it a name. This is fairly fast (around 500 MB/s) but can be slow if you do it over and over again. Instead, it is better to delay your data as well.

请注意,这会占用每个工作人员的一部分内存(例如,根据您的信息,每个工作人员将为字典分配 1.5GB 的内存)。您可以在 .

中阅读更多内容

问题是 required_dict 需要序列化并发送到所有工作线程。由于 required_dict 很大并且许多工作人员同时需要它,重复序列化会导致大量内存爆炸。

有很多修复;对我来说,最简单的方法是从工作线程加载字典并显式使用 map_partitions 而不是 apply.

这是代码中的解决方案,

    def do_df(df):
        required_dict = load_big_dict()
        score_guide = lambda row: expensive_computation(required_dict, row)
        return df.apply(score_guide, axis=1)
        
    library_ddf = dd.from_pandas(library_df, npartitions=128)
    library_ddf = library_ddf.map_partitions(do_df)
    library_ddf = library_ddf.compute()