最后冻结两个 Dask 数组的计算

Computation of two Dask arrays freezing at the end

我生成了两个长度为 450,000,000 的随机 dask 数组,我想将它们彼此相除。当我去计算它们时,计算总是在最后冻结。

我有一个8核32GB实例运行运行代码

我试过下面的代码,我试过的一些修改没有在 x 或 y 中保留数据。

x = da.random.random(450000000, chunks=(10000,))
x = client.persist(x)
z1 = dd.from_array(x)

y = da.random.random(450000000, chunks=(10000,))
y = client.persist(y)
z2 = dd.from_array(y)

flux_ratio_sq = z1.div(z2)
flux_ratio_sq.compute() 

我得到的实际结果是 persist 将 x 和 y 保存在内存中(总共 8GB 内存),这是预期的,然后计算会增加更多内存。我得到的一些错误如下。

很多这样的错误:

distributed.core - INFO - Event loop was unresponsive in Scheduler for 
3.74s.  This is often caused by long-running GIL-holding functions 
or moving large chunks of data. This can cause timeouts and instability.

tornado.application - ERROR - Exception in callback <bound method 
BokehTornado._keep_alive of <bokeh.server.tornado.BokehTornado 
object at 0x7fb48562a4a8>>

raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed

我希望最终结果在 dask 系列中,以便我可以将它与我现有的数据合并。

我将尝试在这里扩展我的评论。拳头:给定的 numpypandas 表现更好(DataFrameSeries),最好使用 numpy 进行计算,然后将结果附加到 DataFrame 如果需要的话。使用 Dask 是完全一样的。在 documentation 之后的第二个你应该坚持只在你需要多次调用同一个数据帧的情况下。

所以对于你的具体问题,你可以做的是

import dask.array as da
N = int(4.5e7)

x = da.random.random(N, chunks=(10000,))
y = da.random.random(N, chunks=(10000,))
flux_ratio_sq = da.divide(x, y).compute()

附录:对于 dask.dataframe,您可以使用 to_parquet() 而不是 compute(),并将结果存储到文件中。在像这样令人尴尬的并行问题中,对 RAM 的影响小于使用 compute()。知道类似的东西是否适用于 dask.array

会很有趣