复杂dask图创建的最简单方法
Simplest way complex dask graph creation
有些对象有一个复杂的计算系统。
难点在于有些计算是分组计算
这可以通过下面的例子来证明:
from dask distributed import client
def load_data_from_db(id):
# load some data
...
return data
def task_a(data):
# some calculations
...
return result
def group_task(*args):
# some calculations
...
return result
def task_b(data, group_data):
# some calculations
...
return result
def task_c(data, task_a_result)
# some calculations
...
return result
ids = [1, 2]
dsk = {'id_{}'.format(i): id for i, id in enumerate(ids)}
dsk['data_0'] = (load_data_from_db, 'id_0')
dsk['data_1'] = (load_data_from_db, 'id_1')
dsk['task_a_result_0'] = (task_a, 'data_0')
dsk['task_a_result_1'] = (task_a, 'data_1')
dsk['group_result'] = (
group_task,
'data_0', 'task_a_result_0',
'data_1', 'task_a_result_1')
dsk['task_b_result_0'] = (task_b, 'data_0', 'group_result')
dsk['task_b_result_1'] = (task_b, 'data_1', 'group_result')
dsk['task_c_result_0'] = (task_c, 'data_0', 'task_a_result_0')
dsk['task_c_result_1'] = (task_c, 'data_1', 'task_a_result_1')
client = Client(scheduler_address)
result = client.get(
dsk,
['task_a_result_0',
'task_b_result_0',
'task_c_result_0',
'task_a_result_1',
'task_b_result_1',
'task_c_result_1'])
统计的对象列表有千个元素,任务个数有几十个(包括几个组任务)。
使用这种图创建方法很难修改图(添加新任务、更改依赖项等)。
对于这些上下文,是否有使用 dask 的更有效的分布式计算方法?
已添加
与期货图表是:
client = Client(scheduler_address)
ids = [1, 2]
data = client.map(load_data_from_db, ids)
result_a = client.map(task_a, data)
group_args = list(chain(*zip(data, result_a)))
result_group = client.submit(task_group, *group_args)
result_b = client.map(task_b, data, [result_group] * len(ids))
result_c = client.map(task_c, data, result_a)
result = client.gather(result_a + result_b + result_c)
并且在任务函数中,输入参数是 Future 实例,然后 arg.result() 在使用之前。
如果你想在计算过程中修改计算那么我推荐futures接口。
有些对象有一个复杂的计算系统。 难点在于有些计算是分组计算
这可以通过下面的例子来证明:
from dask distributed import client
def load_data_from_db(id):
# load some data
...
return data
def task_a(data):
# some calculations
...
return result
def group_task(*args):
# some calculations
...
return result
def task_b(data, group_data):
# some calculations
...
return result
def task_c(data, task_a_result)
# some calculations
...
return result
ids = [1, 2]
dsk = {'id_{}'.format(i): id for i, id in enumerate(ids)}
dsk['data_0'] = (load_data_from_db, 'id_0')
dsk['data_1'] = (load_data_from_db, 'id_1')
dsk['task_a_result_0'] = (task_a, 'data_0')
dsk['task_a_result_1'] = (task_a, 'data_1')
dsk['group_result'] = (
group_task,
'data_0', 'task_a_result_0',
'data_1', 'task_a_result_1')
dsk['task_b_result_0'] = (task_b, 'data_0', 'group_result')
dsk['task_b_result_1'] = (task_b, 'data_1', 'group_result')
dsk['task_c_result_0'] = (task_c, 'data_0', 'task_a_result_0')
dsk['task_c_result_1'] = (task_c, 'data_1', 'task_a_result_1')
client = Client(scheduler_address)
result = client.get(
dsk,
['task_a_result_0',
'task_b_result_0',
'task_c_result_0',
'task_a_result_1',
'task_b_result_1',
'task_c_result_1'])
统计的对象列表有千个元素,任务个数有几十个(包括几个组任务)。
使用这种图创建方法很难修改图(添加新任务、更改依赖项等)。 对于这些上下文,是否有使用 dask 的更有效的分布式计算方法?
已添加
与期货图表是:
client = Client(scheduler_address)
ids = [1, 2]
data = client.map(load_data_from_db, ids)
result_a = client.map(task_a, data)
group_args = list(chain(*zip(data, result_a)))
result_group = client.submit(task_group, *group_args)
result_b = client.map(task_b, data, [result_group] * len(ids))
result_c = client.map(task_c, data, result_a)
result = client.gather(result_a + result_b + result_c)
并且在任务函数中,输入参数是 Future 实例,然后 arg.result() 在使用之前。
如果你想在计算过程中修改计算那么我推荐futures接口。