如何将 Future 与 dask.distrubuted(Python 库)中执行器的 map 方法一起使用?
How to use Future with map method of the Executor from dask.distrubuted (Python library)?
我是运行dask.distributed集群。
我的任务包括链式计算,其中最后一步是使用 Executor.map
方法对在前面的步骤中创建的列表进行并行处理。列表的长度是事先不知道的,因为它是在计算过程中从中间结果生成的。
代码如下所示:
from distributed import Executor, progress
def process():
e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
port=config('SERVER_PORT')))
futures = []
gen_list1 = get_list_1()
gen_f1 = e.map(generate_1, gen_list1)
futures.append(gen_f1)
gen_list2 = get_list_2()
gen_f2 = e.map(generate_2, gen_list2)
futures.append(gen_f2)
m_list = e.submit(create_m_list) # m_list is created from gen_list1 and gen_list2
# some results of processing are stored in the database
# and create_m_list doesn't need additional arguments
futures.append(m_list)
m_result = e.map(process_m_list, m_list)
futures.append(m_result)
return futures
if __name__ == '__main__':
r = process()
progress(r)
但是,我收到错误 TypeError: zip argument #1 must support iteration
:
File "F:/wl/under_development/database/jobs.py", line 366, in start_job
match_result = e.map(process_m_list, m_list)
File "C:\Anaconda\lib\site-packages\distributed\executor.py", line 672, in map
iterables = list(zip(*zip(*iterables)))
TypeError: zip argument #1 must support iteration
gen_list1
和 gen_list2
是独立计算的,但是 m_list
是从 gen_list1
和 gen_list2
创建的,因此依赖于它们。
我也试过调用 m_list
的 .result()
方法,但是,它阻塞了 process
函数,直到 gen_list1
和 gen_list2
的计算已经完成了。
我也试过调用 m_list
的异步方法 ._result
,但它产生了同样的错误 "zip argument #1 must support iteration"。 dask.delayed
(m_result = e.map(process_m_list, delayed(m_list))
).
也出现了同样的错误
dask.distributed
的文档在这方面含糊不清,示例仅提及已经存在的真实列表对象。但是,SO 中的其他帖子以及 Google 表明这应该是可能的。
这是我的 Python 发行版
的版本字符串
Python 2.7.11 |Anaconda custom (64-bit)| (default, Feb 16 2016, 09:58:36) [MSC v.1500 64 bit (AMD64)] on win32
你的问题症结似乎在这里:
m_list = e.submit(create_m_list)
m_result = e.map(process_m_list, m_list)
你说得对,你不能将功能映射到个人未来。您需要传递 map
一个序列。 Dask 在不了解您的数据的情况下不知道要提交多少功能。在 future 上调用 .result()
将是一个很好的解决方案:
m_list = e.submit(create_m_list)
m_result = e.map(process_m_list, m_list.result())
I've also tried calling .result() method of m_list, however, it has blocked the function process until computations of gen_list1 and gen_list2 have finished.
没错。如果没有任何附加信息,调度程序将更喜欢较早提交的计算。您可以通过先提交 create_m_list
函数,然后提交您的额外计算,然后等待 create_m_list
结果来解决此问题。
m_list = e.submit(create_m_list) # give this highest priority
f1 = e.map(generate_1, get_list_1())
f2 = e.map(generate_2, gen_list_2())
L = m_list.result() # block on m_list until done
m_result = e.map(process_m_list, L) # submit more tasks
return [f1, f2, m_result]
我是运行dask.distributed集群。
我的任务包括链式计算,其中最后一步是使用 Executor.map
方法对在前面的步骤中创建的列表进行并行处理。列表的长度是事先不知道的,因为它是在计算过程中从中间结果生成的。
代码如下所示:
from distributed import Executor, progress
def process():
e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
port=config('SERVER_PORT')))
futures = []
gen_list1 = get_list_1()
gen_f1 = e.map(generate_1, gen_list1)
futures.append(gen_f1)
gen_list2 = get_list_2()
gen_f2 = e.map(generate_2, gen_list2)
futures.append(gen_f2)
m_list = e.submit(create_m_list) # m_list is created from gen_list1 and gen_list2
# some results of processing are stored in the database
# and create_m_list doesn't need additional arguments
futures.append(m_list)
m_result = e.map(process_m_list, m_list)
futures.append(m_result)
return futures
if __name__ == '__main__':
r = process()
progress(r)
但是,我收到错误 TypeError: zip argument #1 must support iteration
:
File "F:/wl/under_development/database/jobs.py", line 366, in start_job
match_result = e.map(process_m_list, m_list)
File "C:\Anaconda\lib\site-packages\distributed\executor.py", line 672, in map
iterables = list(zip(*zip(*iterables)))
TypeError: zip argument #1 must support iteration
gen_list1
和 gen_list2
是独立计算的,但是 m_list
是从 gen_list1
和 gen_list2
创建的,因此依赖于它们。
我也试过调用 m_list
的 .result()
方法,但是,它阻塞了 process
函数,直到 gen_list1
和 gen_list2
的计算已经完成了。
我也试过调用 m_list
的异步方法 ._result
,但它产生了同样的错误 "zip argument #1 must support iteration"。 dask.delayed
(m_result = e.map(process_m_list, delayed(m_list))
).
dask.distributed
的文档在这方面含糊不清,示例仅提及已经存在的真实列表对象。但是,SO 中的其他帖子以及 Google 表明这应该是可能的。
这是我的 Python 发行版
的版本字符串Python 2.7.11 |Anaconda custom (64-bit)| (default, Feb 16 2016, 09:58:36) [MSC v.1500 64 bit (AMD64)] on win32
你的问题症结似乎在这里:
m_list = e.submit(create_m_list)
m_result = e.map(process_m_list, m_list)
你说得对,你不能将功能映射到个人未来。您需要传递 map
一个序列。 Dask 在不了解您的数据的情况下不知道要提交多少功能。在 future 上调用 .result()
将是一个很好的解决方案:
m_list = e.submit(create_m_list)
m_result = e.map(process_m_list, m_list.result())
I've also tried calling .result() method of m_list, however, it has blocked the function process until computations of gen_list1 and gen_list2 have finished.
没错。如果没有任何附加信息,调度程序将更喜欢较早提交的计算。您可以通过先提交 create_m_list
函数,然后提交您的额外计算,然后等待 create_m_list
结果来解决此问题。
m_list = e.submit(create_m_list) # give this highest priority
f1 = e.map(generate_1, get_list_1())
f2 = e.map(generate_2, gen_list_2())
L = m_list.result() # block on m_list until done
m_result = e.map(process_m_list, L) # submit more tasks
return [f1, f2, m_result]