触发 Dask worker 释放内存
Trigger Dask workers to release memory
我正在使用 Dask 分配一些函数的计算。我的总体布局如下所示:
from dask.distributed import Client, LocalCluster, as_completed
cluster = LocalCluster(processes=config.use_dask_local_processes,
n_workers=1,
threads_per_worker=1,
)
client = Client(cluster)
cluster.scale(config.dask_local_worker_instances)
work_futures = []
# For each group do work
for group in groups:
fcast_futures.append(client.submit(_work, group))
# Wait till the work is done
for done_work in as_completed(fcast_futures, with_results=False):
try:
result = done_work.result()
except Exception as error:
log.exception(error)
我的问题是,对于大量作业,我往往会达到内存限制。我看到很多:
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 1.15 GB -- Worker memory limit: 1.43 GB
似乎每个未来都没有释放它的记忆。我怎样才能触发它?我在 Python 2.7.
上使用 dask==1.2.0
结果是调度程序的帮助,只要客户端上有指向它的未来。当(或之后不久)最后一个未来被 python 垃圾收集时,内存被释放。在您的情况下,您在整个计算过程中将所有期货保存在一个列表中。您可以尝试修改循环:
for done_work in as_completed(fcast_futures, with_results=False):
try:
result = done_work.result()
except Exception as error:
log.exception(error)
done_work.release()
或将 as_completed
循环替换为在处理后从列表中明确删除期货的内容。
我正在使用 Dask 分配一些函数的计算。我的总体布局如下所示:
from dask.distributed import Client, LocalCluster, as_completed
cluster = LocalCluster(processes=config.use_dask_local_processes,
n_workers=1,
threads_per_worker=1,
)
client = Client(cluster)
cluster.scale(config.dask_local_worker_instances)
work_futures = []
# For each group do work
for group in groups:
fcast_futures.append(client.submit(_work, group))
# Wait till the work is done
for done_work in as_completed(fcast_futures, with_results=False):
try:
result = done_work.result()
except Exception as error:
log.exception(error)
我的问题是,对于大量作业,我往往会达到内存限制。我看到很多:
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 1.15 GB -- Worker memory limit: 1.43 GB
似乎每个未来都没有释放它的记忆。我怎样才能触发它?我在 Python 2.7.
上使用 dask==1.2.0结果是调度程序的帮助,只要客户端上有指向它的未来。当(或之后不久)最后一个未来被 python 垃圾收集时,内存被释放。在您的情况下,您在整个计算过程中将所有期货保存在一个列表中。您可以尝试修改循环:
for done_work in as_completed(fcast_futures, with_results=False):
try:
result = done_work.result()
except Exception as error:
log.exception(error)
done_work.release()
或将 as_completed
循环替换为在处理后从列表中明确删除期货的内容。