复杂dask图创建的最简单方法

Simplest way complex dask graph creation

有些对象有一个复杂的计算系统。 难点在于有些计算是分组计算

这可以通过下面的例子来证明:

from dask distributed import client

def load_data_from_db(id):
    # load some data
    ...
    return data

def task_a(data):
    # some calculations
    ...
    return result

def group_task(*args):
    # some calculations
    ...
    return result

def task_b(data, group_data):
    # some calculations
    ...
    return result

def task_c(data, task_a_result)
    # some calculations
    ...
    return result

ids = [1, 2]
dsk = {'id_{}'.format(i): id for i, id in enumerate(ids)}

dsk['data_0'] = (load_data_from_db, 'id_0')
dsk['data_1'] = (load_data_from_db, 'id_1')

dsk['task_a_result_0'] = (task_a, 'data_0')
dsk['task_a_result_1'] = (task_a, 'data_1')

dsk['group_result'] = (
    group_task,
    'data_0', 'task_a_result_0',
    'data_1', 'task_a_result_1')

dsk['task_b_result_0'] = (task_b, 'data_0', 'group_result')
dsk['task_b_result_1'] = (task_b, 'data_1', 'group_result')

dsk['task_c_result_0'] = (task_c, 'data_0', 'task_a_result_0')
dsk['task_c_result_1'] = (task_c, 'data_1', 'task_a_result_1')

client = Client(scheduler_address)
result = client.get(
    dsk,
    ['task_a_result_0',
     'task_b_result_0',
     'task_c_result_0',
     'task_a_result_1',
     'task_b_result_1',
     'task_c_result_1'])

统计的对象列表有千个元素,任务个数有几十个(包括几个组任务)。

使用这种图创建方法很难修改图(添加新任务、更改依赖项等)。 对于这些上下文,是否有使用 dask 的更有效的分布式计算方法?

已添加

期货图表是:

client = Client(scheduler_address)

ids = [1, 2]
data = client.map(load_data_from_db, ids)

result_a = client.map(task_a, data)

group_args = list(chain(*zip(data, result_a)))
result_group = client.submit(task_group, *group_args)

result_b = client.map(task_b, data, [result_group] * len(ids))

result_c = client.map(task_c, data, result_a)

result = client.gather(result_a + result_b + result_c)

并且在任务函数中,输入参数是 Future 实例,然后 arg.result() 在使用之前。

如果你想在计算过程中修改计算那么我推荐futures接口。