计算图期间 Dask 广播不可用

Dask broadcast not available during compute graph

我正在试验 Dask 并希望将查找 pandas.DataFrame 发送到所有工作节点。不幸的是,它失败了:

TypeError: ("'Future' object is not subscriptable", 'occurred at index 0')

当使用 lookup.result()['foo'].iloc[2] 而不是 lookup['baz'].iloc[2] 时,它工作正常但是:对于输入数据帧的较大实例,它似乎一次又一次地停留在 from_pandas。此外,未来需要手动阻塞(应用操作中的每一行一遍又一遍)似乎很奇怪。有没有办法为每个工作节点阻塞一次未来?一个天真的改进可能是使用 map_partitions,但这只有在分区数量相当小的情况下才可行。

import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

client = Client()

df_first = pd.DataFrame({'foo':[1,2,3]})
df_second = pd.DataFrame({'bar':[1,2,3], 'baz':[1,2,3]})

df_first_scattered = client.scatter(df_first, broadcast=True)
df_second_dask = dd.from_pandas(df_second, npartitions=2)


def foo(row, lookup):
    # TODO some computation which relies on the lookup
    return lookup['foo'].iloc[2]

df_second_dask['foo'] = df_second_dask.apply(lambda x: foo(x, df_first_scattered), axis = 1, meta=('baz', 'int64'))
df_second_dask = df_second_dask.compute()
df_second_dask.head()

事实上,对于较大的问题实例,这种天真的 dask 实现似乎比普通的 pandas 慢。我怀疑执行性能慢与上面提出的问题有关。

而不是这个:

df_second_dask['foo'] = df_second_dask.apply(lambda x: foo(x, df_first_scattered), axis = 1, meta=('baz', 'int64'))

试试这个:

df_second_dask['foo'] = df_second_dask.apply(foo, args=[df_first_scattered], axis = 1, meta=('baz', 'int64'))

之前您将 future 隐藏在 lambda 函数中。 Dask 无法找到它以将其转换为正确的值。相反,当我们将 future 作为适当的参数传递时,Dask 能够识别它的本质并正确地为您提供价值。