我收集了一些期货,这些期货是坚持使用 dask 数据框的结果。如何对它们进行延迟操作?
I have collection of futures which are result of persist on dask dataframe. How to do a delayed operation on them?
我已经设置了一个调度程序和 4 个工作节点来对 csv 进行一些处理。 csv 的大小仅为 300 mb。
df = dd.read_csv('/Downloads/tmpcrnin5ta',assume_missing=True)
df = df.groupby(['col_1','col_2']).agg('mean').reset_index()
df = client.persist(df)
def create_sep_futures(symbol,df):
symbol_df = copy.deepcopy(df[df['symbol' == symbol]])
return symbol_df
lazy_values = [delayed(create_sep_futures)(symbol, df) for symbol in st]
future = client.compute(lazy_values)
result = client.gather(future)
第一个列表包含 1000 个元素
当我这样做时,我得到这个错误:
distributed.worker - WARNING - Compute Failed
Function: create_sep_futures
args: ('PHG', symbol col_3 col_2 \
0 A 1.451261e+09 23.512857
1 A 1.451866e+09 23.886857
2 A 1.452470e+09 25.080429
kwargs: {}
Exception: KeyError(False,)
我的假设是工作人员应该获得完整的数据框并对其进行查询。但我认为它只是得到了阻止并尝试这样做。
它的解决方法是什么?由于数据框块已经在工人的记忆中。我不想将数据框移动到每个工人。
数据帧上的操作,使用数据帧语法和 API,默认情况下是惰性的(延迟的),您不需要做更多的事情。
第一个问题:你的语法错误df[df['symbol' == symbol]]
=> df[df['symbol'] == symbol]
。这就是False
键的由来。
所以您可能正在寻找的解决方案:
future = client.compute(df[df['symbol'] == symbol])
如果您 想要单独处理块,您可以查看 df.map_partitions
,它与普通函数一起使用并负责传递数据或delayed/futures 或 df.to_delayed
,这将为您提供一组可与延迟函数一起使用的延迟对象。
我已经设置了一个调度程序和 4 个工作节点来对 csv 进行一些处理。 csv 的大小仅为 300 mb。
df = dd.read_csv('/Downloads/tmpcrnin5ta',assume_missing=True)
df = df.groupby(['col_1','col_2']).agg('mean').reset_index()
df = client.persist(df)
def create_sep_futures(symbol,df):
symbol_df = copy.deepcopy(df[df['symbol' == symbol]])
return symbol_df
lazy_values = [delayed(create_sep_futures)(symbol, df) for symbol in st]
future = client.compute(lazy_values)
result = client.gather(future)
第一个列表包含 1000 个元素
当我这样做时,我得到这个错误:
distributed.worker - WARNING - Compute Failed
Function: create_sep_futures
args: ('PHG', symbol col_3 col_2 \
0 A 1.451261e+09 23.512857
1 A 1.451866e+09 23.886857
2 A 1.452470e+09 25.080429
kwargs: {}
Exception: KeyError(False,)
我的假设是工作人员应该获得完整的数据框并对其进行查询。但我认为它只是得到了阻止并尝试这样做。
它的解决方法是什么?由于数据框块已经在工人的记忆中。我不想将数据框移动到每个工人。
数据帧上的操作,使用数据帧语法和 API,默认情况下是惰性的(延迟的),您不需要做更多的事情。
第一个问题:你的语法错误df[df['symbol' == symbol]]
=> df[df['symbol'] == symbol]
。这就是False
键的由来。
所以您可能正在寻找的解决方案:
future = client.compute(df[df['symbol'] == symbol])
如果您 想要单独处理块,您可以查看 df.map_partitions
,它与普通函数一起使用并负责传递数据或delayed/futures 或 df.to_delayed
,这将为您提供一组可与延迟函数一起使用的延迟对象。