我收集了一些期货,这些期货是坚持使用 dask 数据框的结果。如何对它们进行延迟操作?

I have collection of futures which are result of persist on dask dataframe. How to do a delayed operation on them?

我已经设置了一个调度程序和 4 个工作节点来对 csv 进行一些处理。 csv 的大小仅为 300 mb。

df = dd.read_csv('/Downloads/tmpcrnin5ta',assume_missing=True)

df = df.groupby(['col_1','col_2']).agg('mean').reset_index()
df = client.persist(df)



def create_sep_futures(symbol,df):    

     symbol_df = copy.deepcopy(df[df['symbol' == symbol]])

     return symbol_df
lazy_values = [delayed(create_sep_futures)(symbol, df) for symbol in st]

 future = client.compute(lazy_values)
 result = client.gather(future)

第一个列表包含 1000 个元素

当我这样做时,我得到这个错误:

 distributed.worker - WARNING -  Compute Failed
 Function:  create_sep_futures
 args:      ('PHG',       symbol  col_3  col_2  \
 0                A            1.451261e+09                23.512857   
 1                A            1.451866e+09                23.886857   
 2                A            1.452470e+09                25.080429   

 kwargs:    {}
 Exception: KeyError(False,)

我的假设是工作人员应该获得完整的数据框并对其进行查询。但我认为它只是得到了阻止并尝试这样做。

它的解决方法是什么?由于数据框块已经在工人的记忆中。我不想将数据框移动到每个工人。

数据帧上的操作,使用数据帧语法和 API,默认情况下是惰性的(延迟的),您不需要做更多的事情。

第一个问题:你的语法错误df[df['symbol' == symbol]] => df[df['symbol'] == symbol]。这就是False键的由来。

所以您可能正在寻找的解决方案:

future = client.compute(df[df['symbol'] == symbol])

如果您 想要单独处理块,您可以查看 df.map_partitions,它与普通函数一起使用并负责传递数据或delayed/futures 或 df.to_delayed,这将为您提供一组可与延迟函数一起使用的延迟对象。