工作完成后终止 dask worker
Terminating dask workers after jobs are done
我正在尝试在集群上使用 dask,我有兴趣在所有工作完成后立即终止所有工作人员。
我试图用 retire_workers 方法做到这一点,但这似乎并没有杀死工人。
这是一个例子。
import time
import os
from dask.distributed import Client
def long_func(x):
time.sleep(2)
return 1
if __name__ == '__main__':
C = Client(scheduler_file='sched.json')
res = []
for _ in range(10):
res.append(C.submit(long_func, _))
for r in res:
r.result()
workers = list(C.scheduler_info()['workers'])
# C.run(lambda: os._exit(0), workers=workers)
C.retire_workers(workers=workers, close_workers=True)
调度程序和工作程序是使用这些命令启动的:
dask-scheduler --scheduler-file sched.json
dask-worker --scheduler-file sched.json --nthreads=1 --lifetime='5minutes'
原本希望在执行完上面的python代码后,worker会终止(20秒后),但它并没有终止,停留了整整5分钟。有什么解决方法的建议吗?
这将关闭连接的调度程序并退出工作人员:
C.shutdown()
我建议使用上下文管理器来管理集群——它既漂亮又干净。在本地工作时,我遇到过 RAM 内存用尽并使我的计算机停止运行的问题,但这是我经常使用的示例:
# start our Dask cluster
from dask.distributed import Client,LocalCluster
if __name__ == '__main__':
cluster = LocalCluster()
with Client(cluster) as client:
print("scheduler host: ", client.scheduler.address)
# do some stuff
我正在尝试在集群上使用 dask,我有兴趣在所有工作完成后立即终止所有工作人员。 我试图用 retire_workers 方法做到这一点,但这似乎并没有杀死工人。 这是一个例子。
import time
import os
from dask.distributed import Client
def long_func(x):
time.sleep(2)
return 1
if __name__ == '__main__':
C = Client(scheduler_file='sched.json')
res = []
for _ in range(10):
res.append(C.submit(long_func, _))
for r in res:
r.result()
workers = list(C.scheduler_info()['workers'])
# C.run(lambda: os._exit(0), workers=workers)
C.retire_workers(workers=workers, close_workers=True)
调度程序和工作程序是使用这些命令启动的:
dask-scheduler --scheduler-file sched.json
dask-worker --scheduler-file sched.json --nthreads=1 --lifetime='5minutes'
原本希望在执行完上面的python代码后,worker会终止(20秒后),但它并没有终止,停留了整整5分钟。有什么解决方法的建议吗?
这将关闭连接的调度程序并退出工作人员:
C.shutdown()
我建议使用上下文管理器来管理集群——它既漂亮又干净。在本地工作时,我遇到过 RAM 内存用尽并使我的计算机停止运行的问题,但这是我经常使用的示例:
# start our Dask cluster
from dask.distributed import Client,LocalCluster
if __name__ == '__main__':
cluster = LocalCluster()
with Client(cluster) as client:
print("scheduler host: ", client.scheduler.address)
# do some stuff