Python:pandas 数据帧上的多处理错误:客户端具有非平凡的状态,它是本地的且不可篡改的
Python: Multiprocessing error on pandas data frame: Clients have non-trivial state that is local and unpickleable
我有一个数据帧,我正在使用 groupby
将其分成多个数据帧。现在我想处理这些数据帧中的每一个,我已经为其编写了一个函数process_s2id
并行。我将整个代码放在 class
中,我正在使用另一个文件中的主函数执行它。但是我收到以下错误:
"Clients have non-trivial state that is local and unpickleable.",
_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and unpickleable.
代码如下(我们在class中执行main()
函数):
import logging
import pandas as pd
from functools import partial
from multiprocessing import Pool, cpu_count
class TestClass:
def __init__(self):
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger()
def process_s2id(self, df, col, new_col):
dim2 = ['s2id', 'date', 'hours']
df_hour = df.groupby(dim2)[[col, 'orders']].sum().reset_index()
df_hour[new_col] = df_hour[col] / df_hour['orders']
df_hour = df_hour[dim2 + [new_col]]
return df_hour
def run_parallel(self, df):
series = [frame for keys, frame in df.groupby('s2id')]
p = Pool(cpu_count())
prod_x = partial(
self.process_s2id,
col ="total_supply",
new_col = "supply"
)
s2id_supply_list = p.map(prod_x, series)
p.close()
p.join()
s2id_supply = pd.concat(s2id_supply_list, axis=0)
return ms2id_bsl
def main(self):
data = pd.read_csv("data/interim/fs.csv")
out = self.run_parallel(data)
return out
我在 Spyder 中试过 运行 这段代码,它工作正常。但是当我从另一个文件执行它时。我收到一个错误。以下为执行文件代码及错误:
import TestClass
def main():
tc = TestClass()
data = tc.main()
if __name__ == '__main__':
main()
当我查看错误回溯时,我发现错误发生在函数开始并行的行 s2id_supply_list = p.map(prod_x, series)
上。我也在系列中尝试了 运行 这个并且它有效。另外,我注意到这个特定错误来自 client.py
from Google cloud package。有一段代码,我在其中将数据上传到 Google 云,但这对于这段代码应该是不变的。我努力搜索这个错误,但所有结果都链接到 Google 云包相关问题,而不是多处理包。
任何人都可以帮助我理解这个错误,我该如何解决它?
其他信息:
我有以下版本的软件包:
python==3.7.7
pandas==1.0.5
google-cloud-storage==1.20.0
google-cloud-core==1.0.3
我在 macbook pro 上 运行 这个
我想通了。当我们在一个函数上使用 Pool
到 运行 它并行时,它期望第一个参数是迭代器。换句话说,该函数将 运行 并行处理第一个参数的不同值。当我们在 class 中有一个非静态函数时,我们的第一个参数是 self
或 class 本身。但是愚蠢的 Pool
函数不知道如何使用 self
进行迭代,因为它是错误的参数。正确的参数是第二个。
我们可以通过以下任一方式解决此问题:
- 从 class 中取出函数并将
self
踢出参数。
- 在函数顶部添加
@staticmethod
并将 self
踢出参数。
我希望这对遇到类似问题的人有所帮助。
我有一个数据帧,我正在使用 groupby
将其分成多个数据帧。现在我想处理这些数据帧中的每一个,我已经为其编写了一个函数process_s2id
并行。我将整个代码放在 class
中,我正在使用另一个文件中的主函数执行它。但是我收到以下错误:
"Clients have non-trivial state that is local and unpickleable.",
_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and unpickleable.
代码如下(我们在class中执行main()
函数):
import logging
import pandas as pd
from functools import partial
from multiprocessing import Pool, cpu_count
class TestClass:
def __init__(self):
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger()
def process_s2id(self, df, col, new_col):
dim2 = ['s2id', 'date', 'hours']
df_hour = df.groupby(dim2)[[col, 'orders']].sum().reset_index()
df_hour[new_col] = df_hour[col] / df_hour['orders']
df_hour = df_hour[dim2 + [new_col]]
return df_hour
def run_parallel(self, df):
series = [frame for keys, frame in df.groupby('s2id')]
p = Pool(cpu_count())
prod_x = partial(
self.process_s2id,
col ="total_supply",
new_col = "supply"
)
s2id_supply_list = p.map(prod_x, series)
p.close()
p.join()
s2id_supply = pd.concat(s2id_supply_list, axis=0)
return ms2id_bsl
def main(self):
data = pd.read_csv("data/interim/fs.csv")
out = self.run_parallel(data)
return out
我在 Spyder 中试过 运行 这段代码,它工作正常。但是当我从另一个文件执行它时。我收到一个错误。以下为执行文件代码及错误:
import TestClass
def main():
tc = TestClass()
data = tc.main()
if __name__ == '__main__':
main()
当我查看错误回溯时,我发现错误发生在函数开始并行的行 s2id_supply_list = p.map(prod_x, series)
上。我也在系列中尝试了 运行 这个并且它有效。另外,我注意到这个特定错误来自 client.py
from Google cloud package。有一段代码,我在其中将数据上传到 Google 云,但这对于这段代码应该是不变的。我努力搜索这个错误,但所有结果都链接到 Google 云包相关问题,而不是多处理包。
任何人都可以帮助我理解这个错误,我该如何解决它?
其他信息: 我有以下版本的软件包:
python==3.7.7
pandas==1.0.5
google-cloud-storage==1.20.0
google-cloud-core==1.0.3
我在 macbook pro 上 运行 这个
我想通了。当我们在一个函数上使用 Pool
到 运行 它并行时,它期望第一个参数是迭代器。换句话说,该函数将 运行 并行处理第一个参数的不同值。当我们在 class 中有一个非静态函数时,我们的第一个参数是 self
或 class 本身。但是愚蠢的 Pool
函数不知道如何使用 self
进行迭代,因为它是错误的参数。正确的参数是第二个。
我们可以通过以下任一方式解决此问题:
- 从 class 中取出函数并将
self
踢出参数。 - 在函数顶部添加
@staticmethod
并将self
踢出参数。
我希望这对遇到类似问题的人有所帮助。