Dask 数据帧崩溃
Dask dataframe crashes
我正在使用 Dask 加载大型镶木地板数据框,但似乎无法对其执行任何操作,除非系统崩溃或出现一百万个错误且没有输出。
压缩后的数据约为 165M,或者在 pandas 中加载后为 13G(它非常适合可用的 45G RAM)。
import pandas as pd
df = pd.read_parquet('data_simulated.parquet')
df.memory_usage(deep=True).sum() * 1e-9
# returns 13.09
df.head()
# prints the head of the dataframe properly
相反,如果使用 Dask
from dask.distributed import Client
import dask.dataframe as dataframe
client = Client()
# prints: <Client: 'tcp://127.0.0.1:38576' processes=7 threads=28, memory=48.32 GB>
df = dataframe.read_parquet('data_simulated.parquet')
df.memory_usage(deep=True).sum().compute() * 1e-9
打印
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
[a large traceback]
KilledWorker: ("('series-groupby-sum-chunk-memory_usage-dc8dab46de985e36d76a85bf3abaccbf', 0)", <Worker 'tcp://127.0.0.1:36882', name: 2, memory: 0, processing: 1>)
如果我尝试执行 df.head()、df.set_index(...) 或任何其他实际计算数据帧上的任何内容的操作,也会发生同样的情况。我试过减少工人的数量,这样每个人都有更多的内存。我也尝试过重新分区数据框,但它也因同样的错误而失败。如果我将客户端 LocalCluster 上的 memory_limit 设置为零,系统就会完全崩溃。
我做错了什么?
编辑:
这是有关数据的一些额外信息(通过 Pandas 加载得到)
In [2]: print(df.dtypes)
market_id uint32
choice_id uint64
attribute_1 bool
attribute_2 bool
attribute_3 bool
income float32
is_urban bool
distance float32
weight float32
quarter uint32
product_id int64
price float64
size float32
share float32
market_quarter int64
product_type object
outside_option int64
dtype: object
In [3]: print(df.shape)
(89429613, 17)
对象 product_type 是一个字符串。
Dask 通过分块加载和处理数据来工作。在 parquet 的情况下,该分块的起源来自数据文件本身:在内部 parquet 被组织成“行组”,即要一起读取的行集。
听起来在这种情况下,整个数据集由一个文件中的一个行组组成。这意味着 Dask 没有机会将数据拆分成块;你得到一个任务,它承担了一个工作人员的全部内存压力(可能等于总数据大小加上一些临时值),它只分配了总系统内存的一部分。因此出现错误。
请注意,您可以关闭内存监控以防止工作人员在 configuration 中或直接使用 memory_limit=0
等关键字被杀死。在这种情况下,您知道只有一名工作人员会执行负载。
在某些非常特殊的情况下(没有 nesting/list/map 类型),可以拆分行组,但是不存在用于此的代码,并且由于压缩和编码而效率低下数据。
我正在使用 Dask 加载大型镶木地板数据框,但似乎无法对其执行任何操作,除非系统崩溃或出现一百万个错误且没有输出。
压缩后的数据约为 165M,或者在 pandas 中加载后为 13G(它非常适合可用的 45G RAM)。
import pandas as pd
df = pd.read_parquet('data_simulated.parquet')
df.memory_usage(deep=True).sum() * 1e-9
# returns 13.09
df.head()
# prints the head of the dataframe properly
相反,如果使用 Dask
from dask.distributed import Client
import dask.dataframe as dataframe
client = Client()
# prints: <Client: 'tcp://127.0.0.1:38576' processes=7 threads=28, memory=48.32 GB>
df = dataframe.read_parquet('data_simulated.parquet')
df.memory_usage(deep=True).sum().compute() * 1e-9
打印
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
[a large traceback]
KilledWorker: ("('series-groupby-sum-chunk-memory_usage-dc8dab46de985e36d76a85bf3abaccbf', 0)", <Worker 'tcp://127.0.0.1:36882', name: 2, memory: 0, processing: 1>)
如果我尝试执行 df.head()、df.set_index(...) 或任何其他实际计算数据帧上的任何内容的操作,也会发生同样的情况。我试过减少工人的数量,这样每个人都有更多的内存。我也尝试过重新分区数据框,但它也因同样的错误而失败。如果我将客户端 LocalCluster 上的 memory_limit 设置为零,系统就会完全崩溃。
我做错了什么?
编辑: 这是有关数据的一些额外信息(通过 Pandas 加载得到)
In [2]: print(df.dtypes)
market_id uint32
choice_id uint64
attribute_1 bool
attribute_2 bool
attribute_3 bool
income float32
is_urban bool
distance float32
weight float32
quarter uint32
product_id int64
price float64
size float32
share float32
market_quarter int64
product_type object
outside_option int64
dtype: object
In [3]: print(df.shape)
(89429613, 17)
对象 product_type 是一个字符串。
Dask 通过分块加载和处理数据来工作。在 parquet 的情况下,该分块的起源来自数据文件本身:在内部 parquet 被组织成“行组”,即要一起读取的行集。
听起来在这种情况下,整个数据集由一个文件中的一个行组组成。这意味着 Dask 没有机会将数据拆分成块;你得到一个任务,它承担了一个工作人员的全部内存压力(可能等于总数据大小加上一些临时值),它只分配了总系统内存的一部分。因此出现错误。
请注意,您可以关闭内存监控以防止工作人员在 configuration 中或直接使用 memory_limit=0
等关键字被杀死。在这种情况下,您知道只有一名工作人员会执行负载。
在某些非常特殊的情况下(没有 nesting/list/map 类型),可以拆分行组,但是不存在用于此的代码,并且由于压缩和编码而效率低下数据。