关于 Dask 的问题——如何使用 pandas 数据框 (groupby/apply) 合并到脚本中
Question on Dask -- how to incorporate to a script using pandas dataframe (groupby/apply)
我正在尝试稍微更改我的代码以合并 Dask,因为我有太多数据需要 pandas 处理。
这是我的 pandas 数据框:
df = df1.merge(df2, how='inner', left_on=['a', 'b', 'c'],
right_on=['a', 'b', 'c'],
suffixes=('', '_DROP'))
df = df.filter(regex='^(?!.*_DROP)').sort_values(['a', 'b'])
df['x'] = df.groupby('a').apply(
lambda group: 1 * (group['b'] == group['b'].min())).reset_index(level=0, drop=True)
df['z'] = df.groupby('a')['m'].shift(1, fill_value=0)
使用任务数据框,到目前为止我有这个:
df = df1.merge(df2, how='inner', left_on=['a', 'b', 'c'],
right_on=['a', 'b', 'c'],
suffixes=('', '_DROP'))
keep_columns = list(filter(lambda v: match('^(?!.*_DROP)', v), df.columns))
df = df[keep_columns]
但是因为排序的原因,我cannot/do不知道如何进行groupby+apply和保持顺序。也许合并?
另一个是使用 'a'+'b' 创建索引,就好像它是一个多级索引——Dask 不支持多级。但不确定这是否是最好的方法。
这是带有内联注释的代码:
import dask.dataframe as dd
import numpy as np
import pandas as pd
df1 = pd.DataFrame(np.random.randint(5, size=(300, 4)), columns=list("mabc"))
df2 = pd.DataFrame(np.random.randint(5, size=(500, 5)), columns=list("abcde"))
ddf1 = dd.from_pandas(df1, npartitions=3)
ddf2 = dd.from_pandas(df2, npartitions=4)
# obviate the need to drop columns after the merge
ddf = ddf1.merge(ddf2[[c for c in ddf2.columns if c in ddf1.columns]] , how='inner', on=['a', 'b', 'c'])
# apply functions on groupby
def func(df):
df = df.sort_values('b')
df['x'] = (df['b']==df['b'].min())
df['z'] = df['m'].shift(1, fill_value=0)
return df
new_ddf = ddf.groupby('a').apply(func, meta=pd.DataFrame(columns=ddf.columns.tolist()+['x','z'])).reset_index(drop=True)
我正在尝试稍微更改我的代码以合并 Dask,因为我有太多数据需要 pandas 处理。
这是我的 pandas 数据框:
df = df1.merge(df2, how='inner', left_on=['a', 'b', 'c'],
right_on=['a', 'b', 'c'],
suffixes=('', '_DROP'))
df = df.filter(regex='^(?!.*_DROP)').sort_values(['a', 'b'])
df['x'] = df.groupby('a').apply(
lambda group: 1 * (group['b'] == group['b'].min())).reset_index(level=0, drop=True)
df['z'] = df.groupby('a')['m'].shift(1, fill_value=0)
使用任务数据框,到目前为止我有这个:
df = df1.merge(df2, how='inner', left_on=['a', 'b', 'c'],
right_on=['a', 'b', 'c'],
suffixes=('', '_DROP'))
keep_columns = list(filter(lambda v: match('^(?!.*_DROP)', v), df.columns))
df = df[keep_columns]
但是因为排序的原因,我cannot/do不知道如何进行groupby+apply和保持顺序。也许合并?
另一个是使用 'a'+'b' 创建索引,就好像它是一个多级索引——Dask 不支持多级。但不确定这是否是最好的方法。
这是带有内联注释的代码:
import dask.dataframe as dd
import numpy as np
import pandas as pd
df1 = pd.DataFrame(np.random.randint(5, size=(300, 4)), columns=list("mabc"))
df2 = pd.DataFrame(np.random.randint(5, size=(500, 5)), columns=list("abcde"))
ddf1 = dd.from_pandas(df1, npartitions=3)
ddf2 = dd.from_pandas(df2, npartitions=4)
# obviate the need to drop columns after the merge
ddf = ddf1.merge(ddf2[[c for c in ddf2.columns if c in ddf1.columns]] , how='inner', on=['a', 'b', 'c'])
# apply functions on groupby
def func(df):
df = df.sort_values('b')
df['x'] = (df['b']==df['b'].min())
df['z'] = df['m'].shift(1, fill_value=0)
return df
new_ddf = ddf.groupby('a').apply(func, meta=pd.DataFrame(columns=ddf.columns.tolist()+['x','z'])).reset_index(drop=True)