每列的 Dask groupby 分别给出错误的结果
Dask groupby over each column separately gives out wrong result
我在这里用虚拟数据模拟我真正想做的事情。我需要执行的步骤:
- 分别对每一列进行一些转换。
- 针对目标列执行 groupby 操作以聚合每列的一些指标。
我模拟的代码
import dask.dataframe as dd
from dask.distributed import Client, as_completed, LocalCluster
cluster = LocalCluster(processes=False)
client = Client(cluster, asynchronous=True)
csv_loc = '/Users/apple/Downloads/iris.data'
df = dd.read_csv(csv_loc) # ofcourse, u need to give aws creds here. Omitting it. Assuming u can read from s3 or otherwise.
client.persist(df)
cols = ['sepal_length', 'sepal_width' ,'petal_length' ,'petal_width', 'species']
# This is needed because I am doing some custom operation on actual data
for c in cols:
if c != 'species':
df[c] = df[c].map(lambda x: x*10)
client.persist(df) # Is this the trouble?
def agg_bivars(col_name):
agg_df = df.groupby('species')[col_name].sum().compute()
return {col_name : agg_df}
agg_futures = client.map(agg_bivars, ['sepal_length', 'sepal_width' ,'petal_length' ,'petal_width'])
for batch in as_completed(agg_futures, with_results=True).batches():
for future, result in batch:
print('result: {}'.format(result))
client.restart()
client.close()
cluster.close()
您可以从这个link下载数据。这是一个非常标准的网上流行数据。
我得到的结果:不同列的相同结果groupby结果。
预期结果:不同的列需要不同的 groupby 结果。
结果:
result: {'sepal_width': species
Iris-setosa 2503.0
Iris-versicolor 2968.0
Iris-virginica 3294.0
Name: sepal_length, dtype: float64}
result: {'sepal_length': species
Iris-setosa 2503.0
Iris-versicolor 2968.0
Iris-virginica 3294.0
Name: sepal_length, dtype: float64}
result: {'petal_width': species
Iris-setosa 2503.0
Iris-versicolor 2968.0
Iris-virginica 3294.0
Name: sepal_length, dtype: float64}
result: {'petal_length': species
Iris-setosa 2503.0
Iris-versicolor 2968.0
Iris-virginica 3294.0
Name: sepal_length, dtype: float64}
Process finished with exit code 0
如果我只在 df 上做 groupby,它工作正常。但是,这里的问题是我 必须 在每个列的 groupby 之前分别对整个 df 进行一些转换。请注意,我正在执行 client.persist(df)
两次。我做了第二次,因为无论我做了什么新的转换,我都希望它们能够持续存在,以便我可以快速查询。
问题出在 agg_bivars
函数中的 compute()
。
试试下面的代码:
def agg_bivars(col_name):
agg_df = df.groupby('species')[col_name].sum() #.compute()
return {col_name : agg_df}
agg_futures = client.map(agg_bivars, ['sepal_length', 'sepal_width' ,'petal_length' ,'petal_width'])
for batch in as_completed(futures=agg_futures, with_results=True).batches():
for future, result in batch:
print(f'result: {list(result.values())[0].compute()}')
结果:
result: species
setosa 2503.0
versicolor 2968.0
virginica 3294.0
Name: sepal_length, dtype: float64
result: species
setosa 1709.0
versicolor 1385.0
virginica 1487.0
Name: sepal_width, dtype: float64
result: species
setosa 732.0
versicolor 2130.0
virginica 2776.0
Name: petal_length, dtype: float64
result: species
setosa 122.0
versicolor 663.0
virginica 1013.0
Name: petal_width, dtype: float64
在我看来,你把事情复杂化了。
Pandas
import pandas as pd
df = pd.read_csv("iris.csv")
df[df.columns[:-1]] = df[df.columns[:-1]] * 10
df.groupby("species").sum()
sepal_length sepal_width petal_length petal_width
species
setosa 2503.0 1709.0 732.0 122.0
versicolor 2968.0 1385.0 2130.0 663.0
virginica 3294.0 1487.0 2776.0 1013.0
达斯克
import dask.dataframe as dd
df = dd.read_csv("iris.csv")
for col in df.columns[:-1]:
df[col] = df[col]*10
df.groupby("species").sum().compute()
sepal_length sepal_width petal_length petal_width
species
setosa 2503.0 1709.0 732.0 122.0
versicolor 2968.0 1385.0 2130.0 663.0
virginica 3294.0 1487.0 2776.0 1013.0
然后如果你想要结果为 dict
你只需要添加 to_dict()
到输出。
我在这里用虚拟数据模拟我真正想做的事情。我需要执行的步骤:
- 分别对每一列进行一些转换。
- 针对目标列执行 groupby 操作以聚合每列的一些指标。
我模拟的代码
import dask.dataframe as dd
from dask.distributed import Client, as_completed, LocalCluster
cluster = LocalCluster(processes=False)
client = Client(cluster, asynchronous=True)
csv_loc = '/Users/apple/Downloads/iris.data'
df = dd.read_csv(csv_loc) # ofcourse, u need to give aws creds here. Omitting it. Assuming u can read from s3 or otherwise.
client.persist(df)
cols = ['sepal_length', 'sepal_width' ,'petal_length' ,'petal_width', 'species']
# This is needed because I am doing some custom operation on actual data
for c in cols:
if c != 'species':
df[c] = df[c].map(lambda x: x*10)
client.persist(df) # Is this the trouble?
def agg_bivars(col_name):
agg_df = df.groupby('species')[col_name].sum().compute()
return {col_name : agg_df}
agg_futures = client.map(agg_bivars, ['sepal_length', 'sepal_width' ,'petal_length' ,'petal_width'])
for batch in as_completed(agg_futures, with_results=True).batches():
for future, result in batch:
print('result: {}'.format(result))
client.restart()
client.close()
cluster.close()
您可以从这个link下载数据。这是一个非常标准的网上流行数据。
我得到的结果:不同列的相同结果groupby结果。
预期结果:不同的列需要不同的 groupby 结果。
结果:
result: {'sepal_width': species
Iris-setosa 2503.0
Iris-versicolor 2968.0
Iris-virginica 3294.0
Name: sepal_length, dtype: float64}
result: {'sepal_length': species
Iris-setosa 2503.0
Iris-versicolor 2968.0
Iris-virginica 3294.0
Name: sepal_length, dtype: float64}
result: {'petal_width': species
Iris-setosa 2503.0
Iris-versicolor 2968.0
Iris-virginica 3294.0
Name: sepal_length, dtype: float64}
result: {'petal_length': species
Iris-setosa 2503.0
Iris-versicolor 2968.0
Iris-virginica 3294.0
Name: sepal_length, dtype: float64}
Process finished with exit code 0
如果我只在 df 上做 groupby,它工作正常。但是,这里的问题是我 必须 在每个列的 groupby 之前分别对整个 df 进行一些转换。请注意,我正在执行 client.persist(df)
两次。我做了第二次,因为无论我做了什么新的转换,我都希望它们能够持续存在,以便我可以快速查询。
问题出在 agg_bivars
函数中的 compute()
。
试试下面的代码:
def agg_bivars(col_name):
agg_df = df.groupby('species')[col_name].sum() #.compute()
return {col_name : agg_df}
agg_futures = client.map(agg_bivars, ['sepal_length', 'sepal_width' ,'petal_length' ,'petal_width'])
for batch in as_completed(futures=agg_futures, with_results=True).batches():
for future, result in batch:
print(f'result: {list(result.values())[0].compute()}')
结果:
result: species
setosa 2503.0
versicolor 2968.0
virginica 3294.0
Name: sepal_length, dtype: float64
result: species
setosa 1709.0
versicolor 1385.0
virginica 1487.0
Name: sepal_width, dtype: float64
result: species
setosa 732.0
versicolor 2130.0
virginica 2776.0
Name: petal_length, dtype: float64
result: species
setosa 122.0
versicolor 663.0
virginica 1013.0
Name: petal_width, dtype: float64
在我看来,你把事情复杂化了。
Pandas
import pandas as pd
df = pd.read_csv("iris.csv")
df[df.columns[:-1]] = df[df.columns[:-1]] * 10
df.groupby("species").sum()
sepal_length sepal_width petal_length petal_width
species
setosa 2503.0 1709.0 732.0 122.0
versicolor 2968.0 1385.0 2130.0 663.0
virginica 3294.0 1487.0 2776.0 1013.0
达斯克
import dask.dataframe as dd
df = dd.read_csv("iris.csv")
for col in df.columns[:-1]:
df[col] = df[col]*10
df.groupby("species").sum().compute()
sepal_length sepal_width petal_length petal_width
species
setosa 2503.0 1709.0 732.0 122.0
versicolor 2968.0 1385.0 2130.0 663.0
virginica 3294.0 1487.0 2776.0 1013.0
然后如果你想要结果为 dict
你只需要添加 to_dict()
到输出。