在 Dask DataFrame.apply() 上,在处理实际行之前接收 n 行值为 1

On Dask DataFrame.apply(), receiving n rows of value 1 before actual rows processed

在下面的代码片段中,我希望日志打印出数字 0 - 4。我知道这些数字可能不是这个顺序,因为任务会被分解成多个并行操作。

代码片段:

from dask import dataframe as dd
import numpy as np
import pandas as pd

df = pd.DataFrame({'A': np.arange(5),
                   'B': np.arange(5),
                   'C': np.arange(5)})

ddf = dd.from_pandas(df, npartitions=1)

def aggregate(x):
    print('B val received: ' + str(x.B))
    return x

ddf.apply(aggregate, axis=1).compute()

但是当上面的代码是运行时,我看到的是:

B val received: 1
B val received: 1
B val received: 1
B val received: 0
B val received: 0
B val received: 1
B val received: 2
B val received: 3
B val received: 4

我看到的不是 0 - 4,而是首先打印的一系列 1,然后是额外的 0。我注意到每次设置 Dask DataFrame 和 运行 对其进行 apply 操作。

打印 dataframe 时没有显示其他值为 1 的行:

   A  B  C
0  0  0  0
1  1  1  1
2  2  2  2
3  3  3  3
4  4  4  4

我的问题是:这些值为 1 的行来自哪里?为什么它们似乎始终出现在数据框中的 "actual" 行之前? 1 值似乎与实际行中的值无关(也就是说,它不像是出于某种原因额外抓取第二行几次)。

Dask 在尝试对整个分区集合执行操作之前,会对您告诉它执行的操作进行一些检查。这就是前几个打印语句的来源。它是内置错误检查的一部分,可防止 Dask 进行一些冗长的系列操作并最终失败。

@Grr 的回答是正确的。 Dask.dataframe 不知道你的函数会产生什么,但仍然必须为你提供一个具有正确类型、数据类型等的惰性 dask.dataframe,所以它会在一些数据上尝试你的函数.

您可以通过使用 meta= 关键字提供有关预期输出的元数据来避免这些检查(DataFrame.apply docstring 中有更多详细信息)。如果您提供此信息,则 Dask.dataframe 将不需要尝试您的函数来确定类型。

在此处复制此部分:

文档字符串

元:pd.DataFrame、pd.Series、字典、可迭代、元组、可选

空的 pd.DataFrame 或 pd.Series 匹配输出的数据类型和列名。这个元数据对于 dask dataframe 中的许多算法来说是必不可少的。为了便于使用,还提供了一些替代输入。可以提供 {name: dtype} 的字典或 (name, dtype) 的可迭代对象,而不是 DataFrame。可以使用 (name, dtype) 的元组代替系列。如果未提供,dask 将尝试推断元数据。这可能会导致意想不到的结果,因此建议提供元数据。有关详细信息,请参阅 dask.dataframe.utils.make_meta.

解决方案

因此,如果您将示例输出创建为空数据框,那么您会没事的:

meta = pd.DataFrame({'A': [1], 'B': [2], 'C': [3]}, 
                    columns=['A', 'B', 'C'])
ddf.apply(aggregate, axis=1, meta=meta)

或者,在这种情况下,因为您的函数不会更改输入的列或数据类型,所以您可以只使用输入的元数据

ddf.apply(aggregate, axis=1, meta=ddf.meta)