在 Dask DataFrame.apply() 上,在处理实际行之前接收 n 行值为 1
On Dask DataFrame.apply(), receiving n rows of value 1 before actual rows processed
在下面的代码片段中,我希望日志打印出数字 0 - 4。我知道这些数字可能不是这个顺序,因为任务会被分解成多个并行操作。
代码片段:
from dask import dataframe as dd
import numpy as np
import pandas as pd
df = pd.DataFrame({'A': np.arange(5),
'B': np.arange(5),
'C': np.arange(5)})
ddf = dd.from_pandas(df, npartitions=1)
def aggregate(x):
print('B val received: ' + str(x.B))
return x
ddf.apply(aggregate, axis=1).compute()
但是当上面的代码是运行时,我看到的是:
B val received: 1
B val received: 1
B val received: 1
B val received: 0
B val received: 0
B val received: 1
B val received: 2
B val received: 3
B val received: 4
我看到的不是 0 - 4,而是首先打印的一系列 1,然后是额外的 0。我注意到每次设置 Dask DataFrame 和 运行 对其进行 apply
操作。
打印 dataframe 时没有显示其他值为 1 的行:
A B C
0 0 0 0
1 1 1 1
2 2 2 2
3 3 3 3
4 4 4 4
我的问题是:这些值为 1 的行来自哪里?为什么它们似乎始终出现在数据框中的 "actual" 行之前? 1 值似乎与实际行中的值无关(也就是说,它不像是出于某种原因额外抓取第二行几次)。
Dask 在尝试对整个分区集合执行操作之前,会对您告诉它执行的操作进行一些检查。这就是前几个打印语句的来源。它是内置错误检查的一部分,可防止 Dask 进行一些冗长的系列操作并最终失败。
@Grr 的回答是正确的。 Dask.dataframe 不知道你的函数会产生什么,但仍然必须为你提供一个具有正确类型、数据类型等的惰性 dask.dataframe,所以它会在一些数据上尝试你的函数.
您可以通过使用 meta=
关键字提供有关预期输出的元数据来避免这些检查(DataFrame.apply docstring 中有更多详细信息)。如果您提供此信息,则 Dask.dataframe 将不需要尝试您的函数来确定类型。
在此处复制此部分:
文档字符串
元:pd.DataFrame、pd.Series、字典、可迭代、元组、可选
空的 pd.DataFrame 或 pd.Series 匹配输出的数据类型和列名。这个元数据对于 dask dataframe 中的许多算法来说是必不可少的。为了便于使用,还提供了一些替代输入。可以提供 {name: dtype} 的字典或 (name, dtype) 的可迭代对象,而不是 DataFrame。可以使用 (name, dtype) 的元组代替系列。如果未提供,dask 将尝试推断元数据。这可能会导致意想不到的结果,因此建议提供元数据。有关详细信息,请参阅 dask.dataframe.utils.make_meta.
解决方案
因此,如果您将示例输出创建为空数据框,那么您会没事的:
meta = pd.DataFrame({'A': [1], 'B': [2], 'C': [3]},
columns=['A', 'B', 'C'])
ddf.apply(aggregate, axis=1, meta=meta)
或者,在这种情况下,因为您的函数不会更改输入的列或数据类型,所以您可以只使用输入的元数据
ddf.apply(aggregate, axis=1, meta=ddf.meta)
在下面的代码片段中,我希望日志打印出数字 0 - 4。我知道这些数字可能不是这个顺序,因为任务会被分解成多个并行操作。
代码片段:
from dask import dataframe as dd
import numpy as np
import pandas as pd
df = pd.DataFrame({'A': np.arange(5),
'B': np.arange(5),
'C': np.arange(5)})
ddf = dd.from_pandas(df, npartitions=1)
def aggregate(x):
print('B val received: ' + str(x.B))
return x
ddf.apply(aggregate, axis=1).compute()
但是当上面的代码是运行时,我看到的是:
B val received: 1
B val received: 1
B val received: 1
B val received: 0
B val received: 0
B val received: 1
B val received: 2
B val received: 3
B val received: 4
我看到的不是 0 - 4,而是首先打印的一系列 1,然后是额外的 0。我注意到每次设置 Dask DataFrame 和 运行 对其进行 apply
操作。
打印 dataframe 时没有显示其他值为 1 的行:
A B C
0 0 0 0
1 1 1 1
2 2 2 2
3 3 3 3
4 4 4 4
我的问题是:这些值为 1 的行来自哪里?为什么它们似乎始终出现在数据框中的 "actual" 行之前? 1 值似乎与实际行中的值无关(也就是说,它不像是出于某种原因额外抓取第二行几次)。
Dask 在尝试对整个分区集合执行操作之前,会对您告诉它执行的操作进行一些检查。这就是前几个打印语句的来源。它是内置错误检查的一部分,可防止 Dask 进行一些冗长的系列操作并最终失败。
@Grr 的回答是正确的。 Dask.dataframe 不知道你的函数会产生什么,但仍然必须为你提供一个具有正确类型、数据类型等的惰性 dask.dataframe,所以它会在一些数据上尝试你的函数.
您可以通过使用 meta=
关键字提供有关预期输出的元数据来避免这些检查(DataFrame.apply docstring 中有更多详细信息)。如果您提供此信息,则 Dask.dataframe 将不需要尝试您的函数来确定类型。
在此处复制此部分:
文档字符串
元:pd.DataFrame、pd.Series、字典、可迭代、元组、可选
空的 pd.DataFrame 或 pd.Series 匹配输出的数据类型和列名。这个元数据对于 dask dataframe 中的许多算法来说是必不可少的。为了便于使用,还提供了一些替代输入。可以提供 {name: dtype} 的字典或 (name, dtype) 的可迭代对象,而不是 DataFrame。可以使用 (name, dtype) 的元组代替系列。如果未提供,dask 将尝试推断元数据。这可能会导致意想不到的结果,因此建议提供元数据。有关详细信息,请参阅 dask.dataframe.utils.make_meta.
解决方案
因此,如果您将示例输出创建为空数据框,那么您会没事的:
meta = pd.DataFrame({'A': [1], 'B': [2], 'C': [3]},
columns=['A', 'B', 'C'])
ddf.apply(aggregate, axis=1, meta=meta)
或者,在这种情况下,因为您的函数不会更改输入的列或数据类型,所以您可以只使用输入的元数据
ddf.apply(aggregate, axis=1, meta=ddf.meta)