group-by/apply 与 Pandas 和多处理
group-by/apply with Pandas and Multiprocessing
我正在尝试使用多处理对 pandas 数据帧进行分组并应用操作(希望加快我的代码速度)。例如,如果我有如下数据框:
A B C
cluster_id
1 1 2 3
1 1 2 3
2 4 5 6
2 7 8 9
我想在列上应用一个函数并按 cluster_id 对它们进行分组。在函数只是 sum
的简单情况下
def my_func(x):
return sum(x)
那么操作应该产生:
A B C
cluster_id
1 2 4 6
2 11 13 15
SO 上有一些类似的帖子,我确实设法接近了某个地方但还没有真正解决它。我的代码失败了,我不知道如何修复它。这是我想出的
import multiprocessing as mp
import pandas as pd
import numpy as np
def _apply_df(args):
df, func = args
return df.groupby(level=0).apply(func)
def mp_apply(df, func):
workers = 4
pool = mp.Pool(processes=workers)
split_dfs = np.array_split(df, workers, axis=1)
result = pool.map(_apply_df, [(d, func) for d in split_dfs])
pool.close()
result = sorted(result, key=lambda x: x[0])
return pd.concat([i[1] for i in result])
def my_func(x):
return sum(x)
if __name__ == '__main__':
df = pd.DataFrame([[1, 2, 3, 1], [1, 2, 3, 1], [4, 5, 6, 2], [7, 8, 9, 2]], columns=['A', 'B', 'C', 'cluster_id'])
df = df.set_index('cluster_id')
out = mp_apply(df, my_func)
print(out)
我收到错误:
TypeError: unsupported operand type(s) for +: 'int' and 'str'
而且看起来它在行
上失败了
result = pool.map(_apply_df, [(d, func) for d in split_dfs])
传递给 _apply_df
的参数 d
看起来是空的。
任何 help/ideas 高度赞赏。如果重要的话,我正在使用 Python 3.6。谢谢!
您的代码中出现问题的主要原因有 2 个
- 使用python的built-in求和函数。这是一个函数,它接受一个可迭代的数字和 return 它们的总和。
例如如果您尝试对数据帧 df 的一部分求和,您将得到相同的错误回溯
sum(df.loc[1])
TypeError Traceback (most recent call last)
<ipython-input-60-6dea0ab0880f> in <module>()
----> 1 sum(df.loc[1])
TypeError: unsupported operand type(s) for +: 'int' and 'str'
要解决此问题,您需要使用 pandas sum
函数,如下所示
df.loc[1].sum()
#output
A 2
B 4
C 6
dtype: int64
如您所见,这将产生预期的结果。即对 data-slice
中的列求和
第二期是"reduce"阶段。每个进程将 return 一个数据帧,行
result = sorted(result, key=lambda x: x[0])
return pd.concat([i[1] for i in result])
第一行将产生一个错误,因为每当结果的 none 有一个名为 0 的列时。与第二行类似的问题。这可以解决如下
return pd.concat(result,axis=1)
现在代码 运行 鉴于正在使用的数据,没有问题。
整体代码:
import multiprocessing as mp
import pandas as pd
import numpy as np
def _apply_df(args):
df, func = args
return df.groupby(level=0).apply(func)
def mp_apply(df, func):
workers = 4
pool = mp.Pool(processes=workers)
split_dfs = np.array_split(df, workers, axis=1)
result = pool.map(_apply_df, [(d, func) for d in split_dfs])
pool.close()
#result = sorted(result, key=lambda x: x[0])
return pd.concat(result,axis=1)
def my_func(x):
return x.sum()
if __name__ == '__main__':
df = pd.DataFrame([[1, 2, 3, 1], [1, 2, 3, 1], [4, 5, 6, 2], [7, 8, 9, 2]], columns=['A', 'B', 'C', 'cluster_id'])
df = df.set_index('cluster_id')
out = mp_apply(df, my_func)
print(out)
输出:
A B C
cluster_id
1 2 4 6
2 11 13 15
我正在尝试使用多处理对 pandas 数据帧进行分组并应用操作(希望加快我的代码速度)。例如,如果我有如下数据框:
A B C
cluster_id
1 1 2 3
1 1 2 3
2 4 5 6
2 7 8 9
我想在列上应用一个函数并按 cluster_id 对它们进行分组。在函数只是 sum
的简单情况下def my_func(x):
return sum(x)
那么操作应该产生:
A B C
cluster_id
1 2 4 6
2 11 13 15
SO 上有一些类似的帖子,我确实设法接近了某个地方但还没有真正解决它。我的代码失败了,我不知道如何修复它。这是我想出的
import multiprocessing as mp
import pandas as pd
import numpy as np
def _apply_df(args):
df, func = args
return df.groupby(level=0).apply(func)
def mp_apply(df, func):
workers = 4
pool = mp.Pool(processes=workers)
split_dfs = np.array_split(df, workers, axis=1)
result = pool.map(_apply_df, [(d, func) for d in split_dfs])
pool.close()
result = sorted(result, key=lambda x: x[0])
return pd.concat([i[1] for i in result])
def my_func(x):
return sum(x)
if __name__ == '__main__':
df = pd.DataFrame([[1, 2, 3, 1], [1, 2, 3, 1], [4, 5, 6, 2], [7, 8, 9, 2]], columns=['A', 'B', 'C', 'cluster_id'])
df = df.set_index('cluster_id')
out = mp_apply(df, my_func)
print(out)
我收到错误:
TypeError: unsupported operand type(s) for +: 'int' and 'str'
而且看起来它在行
上失败了result = pool.map(_apply_df, [(d, func) for d in split_dfs])
传递给 _apply_df
的参数 d
看起来是空的。
任何 help/ideas 高度赞赏。如果重要的话,我正在使用 Python 3.6。谢谢!
您的代码中出现问题的主要原因有 2 个
- 使用python的built-in求和函数。这是一个函数,它接受一个可迭代的数字和 return 它们的总和。 例如如果您尝试对数据帧 df 的一部分求和,您将得到相同的错误回溯
sum(df.loc[1])
TypeError Traceback (most recent call last) <ipython-input-60-6dea0ab0880f> in <module>() ----> 1 sum(df.loc[1]) TypeError: unsupported operand type(s) for +: 'int' and 'str'
要解决此问题,您需要使用 pandas sum
函数,如下所示
df.loc[1].sum()
#output
A 2
B 4
C 6
dtype: int64
如您所见,这将产生预期的结果。即对 data-slice
中的列求和第二期是"reduce"阶段。每个进程将 return 一个数据帧,行
result = sorted(result, key=lambda x: x[0])
return pd.concat([i[1] for i in result])
第一行将产生一个错误,因为每当结果的 none 有一个名为 0 的列时。与第二行类似的问题。这可以解决如下
return pd.concat(result,axis=1)
现在代码 运行 鉴于正在使用的数据,没有问题。
整体代码:
import multiprocessing as mp
import pandas as pd
import numpy as np
def _apply_df(args):
df, func = args
return df.groupby(level=0).apply(func)
def mp_apply(df, func):
workers = 4
pool = mp.Pool(processes=workers)
split_dfs = np.array_split(df, workers, axis=1)
result = pool.map(_apply_df, [(d, func) for d in split_dfs])
pool.close()
#result = sorted(result, key=lambda x: x[0])
return pd.concat(result,axis=1)
def my_func(x):
return x.sum()
if __name__ == '__main__':
df = pd.DataFrame([[1, 2, 3, 1], [1, 2, 3, 1], [4, 5, 6, 2], [7, 8, 9, 2]], columns=['A', 'B', 'C', 'cluster_id'])
df = df.set_index('cluster_id')
out = mp_apply(df, my_func)
print(out)
输出:
A B C
cluster_id
1 2 4 6
2 11 13 15