Python concurrent.futures - TypeError: zip argument #1 must support iteration
Python concurrent.futures - TypeError: zip argument #1 must support iteration
我想使用多处理以 1000 个为一批处理 mongodb 个文档。但是,下面的代码片段给出了 TypeError: zip argument #1 must support iteration
代码:
def documents_processing(skip):
conn = get_connection()
db = conn["db_name"]
print("Process::{} -- db.Transactions.find(no_cursor_timeout=True).skip({}).limit(10000)".format(os.getpid(), skip))
documents = db.Transactions.find(no_cursor_timeout=True).skip(skip).limit(10000)
# Do some processing in mongodb
max_workers = 20
def skip_list():
for i in range(0, 100000, 10000):
yield [j for j in range(i, i + 10000, 1000)]
def main_f():
try:
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
executor.map(documents_processing, skip_list)
except Exception:
print("exception:", traceback.format_exc())
main_f()
错误回溯:
(rpc_venv) [user@localhost ver2_mt]$ python main_mongo_v3.py
exception: Traceback (most recent call last):
File "main_mongo_v3.py", line 113, in main_f
executor.map(documents_processing, skip_list)
File "/usr/lib64/python3.6/concurrent/futures/process.py", line 496, in map
timeout=timeout)
File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 575, in map
fs = [self.submit(fn, *args) for args in zip(*iterables)]
File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 575, in <listcomp>
fs = [self.submit(fn, *args) for args in zip(*iterables)]
File "/usr/lib64/python3.6/concurrent/futures/process.py", line 137, in _get_chunks
it = zip(*iterables)
TypeError: zip argument #1 must support iteration
如何解决这个错误?谢谢。
调用 skip_list
函数到 return 生成器。
目前,您传递的是函数作为第二个参数,而不是可迭代对象。
executor.map(documents_processing, skip_list())
由于您在从 n 开始的每个进程中检索 10k 个文档,您可以将 skip_list
声明为:
def skip_list():
for i in range(0, 100000, 10000):
yield i
我想使用多处理以 1000 个为一批处理 mongodb 个文档。但是,下面的代码片段给出了 TypeError: zip argument #1 must support iteration
代码:
def documents_processing(skip):
conn = get_connection()
db = conn["db_name"]
print("Process::{} -- db.Transactions.find(no_cursor_timeout=True).skip({}).limit(10000)".format(os.getpid(), skip))
documents = db.Transactions.find(no_cursor_timeout=True).skip(skip).limit(10000)
# Do some processing in mongodb
max_workers = 20
def skip_list():
for i in range(0, 100000, 10000):
yield [j for j in range(i, i + 10000, 1000)]
def main_f():
try:
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
executor.map(documents_processing, skip_list)
except Exception:
print("exception:", traceback.format_exc())
main_f()
错误回溯:
(rpc_venv) [user@localhost ver2_mt]$ python main_mongo_v3.py
exception: Traceback (most recent call last):
File "main_mongo_v3.py", line 113, in main_f
executor.map(documents_processing, skip_list)
File "/usr/lib64/python3.6/concurrent/futures/process.py", line 496, in map
timeout=timeout)
File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 575, in map
fs = [self.submit(fn, *args) for args in zip(*iterables)]
File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 575, in <listcomp>
fs = [self.submit(fn, *args) for args in zip(*iterables)]
File "/usr/lib64/python3.6/concurrent/futures/process.py", line 137, in _get_chunks
it = zip(*iterables)
TypeError: zip argument #1 must support iteration
如何解决这个错误?谢谢。
调用 skip_list
函数到 return 生成器。
目前,您传递的是函数作为第二个参数,而不是可迭代对象。
executor.map(documents_processing, skip_list())
由于您在从 n 开始的每个进程中检索 10k 个文档,您可以将 skip_list
声明为:
def skip_list():
for i in range(0, 100000, 10000):
yield i